package com.atguigu.flink0624.chapter11;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

// 静态导入

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/11/19 10:26
 */
public class Flink05_Sql_File {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
    
        // 建立一个动态表与file管理
        tenv.executeSql("create table sensor(" +
                            "   id string, " +
                            "   ts bigint, " +
                            "   vc int " +
                            ")with(" +
                            "   'connector'='filesystem', " +
                            "   'path'='input/sensor.txt', " +
                            "   'format'='csv' " +
                            ")");
    
        
        tenv.executeSql("create table s1(" +
                            "   id string, " +
                            "   ts bigint, " +
                            "   vc int " +
                            ")with(" +
                            "   'connector'='filesystem', " +
                            "   'path'='input/abcdef.txt', " +
                            "   'format'='csv' " +
                            ")");
        /*Table table = tenv.sqlQuery("select * from sensor where id='sensor_1'");
        table.executeInsert("s1");*/
    
        tenv.executeSql("insert into s1 " +
                            "select * " +
                            "from  default_catalog.default_database.sensor " +
                            "where id='sensor_1'");
        
    }
}
